-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate Asn1EncodedDataRouter to use Spring Kafka #138
base: dev
Are you sure you want to change the base?
Conversation
Generated by sending tim to deposit tim endpoint and grabbing value produced to the `topic.Asn1EncoderOutput` topic via kafka-ui
…TimUnsecured_depositsToSdxTopicAndTimTmcFiltered
…ve DI and testability
…tImpl These can be used to wrap the calls to security services and allow easier mocking of responses. It also allows us to better shield ourselves from API contract changes. users of the client don't need to be aware of changes to the external API as all interactions can be encapsulated within the client
This moves the calls to security services from Asn1CommandManager to SecurityServicesClientImpl which allows for more testability, flexibility (mocking client calls), and modularity
…ether (make data dynamic)
…h refactor nor do they confirm behavior
The responsibility to submit data to topics is already contained within Asn1EncodedDataRouter, and Asn1CommandManager is not responsible for any kafka interactions otherwise, so the responsibility was moved to Asn1EncodedDataRouter
Updated method signatures and instance usage to specify ResponseEvent with Address as the generic type. This change improves type safety and aligns with best practices for clearer code readability and maintainability.
…ests - not working well with other tests
Added `RestClientException` to `signMessage` for better error handling. Implemented a new test case to verify behavior when the signing service returns a null response.
Simplified `getServiceRequest` method by directly passing the metadata JSON object instead of the full consumed object. Updated relevant calls to align with this change, improving clarity and reducing redundant operations.
Removed verbose and redundant JavaDoc comments, replacing them with concise inline comments to improve code readability. This change ensures that key processing logic remains documented without unnecessary detail duplication.
0bc948a
to
0d30254
Compare
Removed redundant null checks before calling `sendToRsus` in the main logic. The null checks are properly handled within the `sendToRsus` method, ensuring clearer and more maintainable code.
Standardizing on the ObjectMapper provided in the app context will reduce variability and decrease risk of unintended differences in serdes behavior
This name better represents the behavior of the method. The Javadocs were updated to be accurate as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend reviewing the Asn1EncodedDataRouter code from within your IDE. There are a lot of changes in there that don't necessarily work well with GitHub's viewer. The processing flow should not have changed. The changes in that file should be in one of four categories:
- Deduplication of JSON processing
- Deduplication of message signing or encoding
- Using the SerializationConfig-provided Object Mapper instead of JsonUtils or TimTransmorgrifier or other JSON serdes (there were a few weird ones).
- Extracting logic into named methods to reduce cognitive complexity
return streams.state().isRunningOrRebalancing(); | ||
return streams.state().equals(KafkaStreams.State.RUNNING); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this method is only used in tests for now, and in the tests, a state of REBALANCING
would cause an error. Addressing this possible bug may be worth the effort in a follow-up PR or separate work item. Still, I don't believe the changes belong here since I didn't change how the OdeTimJsonTopology works in this PR.
kafkaTemplate.send(jsonTopics.getTim(), consumerRecord.key(), odeTimData); | ||
kafkaTemplate.send(jsonTopics.getTim(), streamId, odeTimData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the key is empty when consuming from the asn1 decoder output topic. We need to include the streamId as the key when publishing TIM data so that the downstream consumers can use the streamId to lookup the TIM JSON from the K-Table in OdeTimJsonTopology. If we don't produce with a streamId (which we conditionally set a few lines above this method)) then we will never produce to the TMCFiltered topic in the Asn1EncodedDataRouter
if (metadata.has(TimTransmogrifier.REQUEST_STRING)) { | ||
JSONObject request = metadata.getJSONObject(TimTransmogrifier.REQUEST_STRING); | ||
if (request.has(TimTransmogrifier.RSUS_STRING)) { | ||
Object rsus = request.get(TimTransmogrifier.RSUS_STRING); | ||
if (rsus instanceof JSONObject) { | ||
JSONObject rsusIn = (JSONObject) request.get(TimTransmogrifier.RSUS_STRING); | ||
if (rsusIn.has(TimTransmogrifier.RSUS_STRING)) { | ||
Object rsu = rsusIn.get(TimTransmogrifier.RSUS_STRING); | ||
JSONArray rsusOut = new JSONArray(); | ||
if (rsu instanceof JSONArray) { | ||
log.debug("Multiple RSUs exist in the request: {}", request); | ||
JSONArray rsusInArray = (JSONArray) rsu; | ||
for (int i = 0; i < rsusInArray.length(); i++) { | ||
rsusOut.put(rsusInArray.get(i)); | ||
} | ||
request.put(TimTransmogrifier.RSUS_STRING, rsusOut); | ||
} else if (rsu instanceof JSONObject) { | ||
log.debug("Single RSU exists in the request: {}", request); | ||
rsusOut.put(rsu); | ||
request.put(TimTransmogrifier.RSUS_STRING, rsusOut); | ||
} else { | ||
log.debug("No RSUs exist in the request: {}", request); | ||
request.remove(TimTransmogrifier.RSUS_STRING); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: as far as I can tell by reading the code and stepping through it with a debugger, this code block did nothing. The request
variable is manipulated but never used after line 153. Please take a second look at this change. I'm fairly certain it can be safely deleted, but I may be missing something
+ consumedData; | ||
if (log.isDebugEnabled()) { | ||
// print error message and stack trace | ||
EventLogger.logger.error(msg, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Does anyone know if there was a purpose to the EventLogger? Do we consume data from this logger differently?
Date requiredExpirationDate = new Date(); | ||
requiredExpirationDate.setTime(timTimestamp.getTime() + maxDurationTime); | ||
timWithExpiration.put("requiredExpirationDate", dateFormat.format(requiredExpirationDate)); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is one of the rare times in this code where we can safely continue (as far as I can tell from the code) processing a message should an Exception be thrown. I intentionally handled the expectation here and allowed the processing to continue. I couldn't find any downstream issues in the code. Also, please note that we already handled the exception and continued processing in the previous implementation. This does not change the code flow.
} catch (Exception e) { | ||
log.error("Unable to get expiration date from signed messages response. Setting expirationData to 'null'", e); | ||
timWithExpiration.put("expirationDate", "null"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is one of the rare times in this code where we can safely continue (as far as I can tell from the code) processing a message should an Exception be thrown. I intentionally handled the expectation here and allowed the processing to continue. I couldn't find any downstream issues in the code. Also, please note that we already handled the exception and continued processing in the previous implementation. This does not change the code flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This code is heavily tested indirectly by the Asn1EncodedDataRouter tests. The existing tests were not adding much value (I wrote most of them, so my bad), and meaningful tests were difficult to write without duplicating much of the testing already performed in the Asn1EncodedDataRouter tests.
…for Kafka container setup Replaced ConcurrentMessageListenerContainer with KafkaMessageListenerContainer and removed KafkaConsumerConfig dependency. This fixes an issue where we would intermittently encounter the following error when running tests: java.lang.IllegalStateException: Expected 1 but got 0 partitions at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:85)
Updated the expiration date calculation to use `Instant` and ensure accuracy by handling milliseconds properly. Adjusted test cases and mock classes to align with the updated logic. Fixed incorrect test data to reflect the corrected expiration date format.
Replaced the outdated `SimpleDateFormat` with the modern `DateTimeFormatter` to handle date formatting and parsing. This improves thread safety and code readability while ensuring alignment with Java's modern date/time APIs. Adjusted test resources to reflect updated timestamp formatting logic.
Add exception handling to catch RestClientException when signing TIM messages. Log detailed error messages, including specific handling for HttpClientErrorException.NotFound, to provide better debugging information and highlight potential misconfiguration of jpo-security-svcs.
Convert TIM start flag to uppercase for consistency and improve logging to dynamically include the expected start flag value. This enhances clarity and ensures robust matching during header stripping.
Consolidated signing logic into `depositToTimCertExpirationTopic` and removed redundant `signTimWithExpiration` method. Added utility for obtaining hex-encoded signed messages in `SignatureResultModel` to streamline encoding operations.
If we fail to sign the message, we should fail to process the message further. We don't want any unsigned messages in our system (if signing is enabled)
…/spring-kafka/asn1-encoded-router
…/spring-kafka/asn1-encoded-router
PR Details
Description
Date
class with usages ofLocalDateTime
(only in Asn1EncodedDataRouter) to allow for explicit usage of UTC timezone instead of relying on the system settings (they aren't consistent between local and CI, so it's reasonable to suspect there is inconsistency between hosting environments)Related Issue
No related USDOT issue
Motivation and Context
Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are:
How Has This Been Tested?
For the live testing, I set the following environment variables on my system to enable signing and confirm all pieces played well together
To test the signing flow, I produced the test XML to the
topic.Asn1EncoderOutput
topic via the kafka-ui.Snippet from the locals during local testing:
Types of changes
Checklist:
ODE Contributing Guide
Helpful Documentation